import asyncio
from re import Pattern
from typing import Any, ClassVar, overload
from typing_extensions import TypeAlias, deprecated

class ChannelLayerManager:
    backends: dict[str, BaseChannelLayer]

    def __init__(self) -> None: ...
    @property
    def configs(self) -> dict[str, Any]: ...
    def make_backend(self, name: str) -> BaseChannelLayer: ...
    def make_test_backend(self, name: str) -> Any: ...
    def __getitem__(self, key: str) -> BaseChannelLayer: ...
    def __contains__(self, key: str) -> bool: ...
    def set(self, key: str, layer: BaseChannelLayer) -> BaseChannelLayer | None: ...

_ChannelCapacityPattern: TypeAlias = Pattern[str] | str
_ChannelCapacityDict: TypeAlias = dict[_ChannelCapacityPattern, int]
_CompiledChannelCapacity: TypeAlias = list[tuple[Pattern[str], int]]

class BaseChannelLayer:
    MAX_NAME_LENGTH: ClassVar[int] = 100
    expiry: int
    capacity: int
    channel_capacity: _ChannelCapacityDict
    channel_name_regex: Pattern[str]
    group_name_regex: Pattern[str]
    invalid_name_error: str

    def __init__(self, expiry: int = 60, capacity: int = 100, channel_capacity: _ChannelCapacityDict | None = None) -> None: ...
    def compile_capacities(self, channel_capacity: _ChannelCapacityDict) -> _CompiledChannelCapacity: ...
    def get_capacity(self, channel: str) -> int: ...
    @overload
    def match_type_and_length(self, name: str) -> bool: ...
    @overload
    def match_type_and_length(self, name: object) -> bool: ...
    @overload
    def require_valid_channel_name(self, name: str, receive: bool = False) -> bool: ...
    @overload
    def require_valid_channel_name(self, name: object, receive: bool = False) -> bool: ...
    @overload
    def require_valid_group_name(self, name: str) -> bool: ...
    @overload
    def require_valid_group_name(self, name: object) -> bool: ...
    @overload
    def valid_channel_names(self, names: list[str], receive: bool = False) -> bool: ...
    @overload
    def valid_channel_names(self, names: list[Any], receive: bool = False) -> bool: ...
    def non_local_name(self, name: str) -> str: ...
    async def send(self, channel: str, message: dict[str, Any]) -> None: ...
    async def receive(self, channel: str) -> dict[str, Any]: ...
    async def new_channel(self) -> str: ...
    async def flush(self) -> None: ...
    async def group_add(self, group: str, channel: str) -> None: ...
    async def group_discard(self, group: str, channel: str) -> None: ...
    async def group_send(self, group: str, message: dict[str, Any]) -> None: ...
    @deprecated("Use require_valid_channel_name instead.")
    def valid_channel_name(self, channel_name: str, receive: bool = False) -> bool: ...
    @deprecated("Use require_valid_group_name instead.")
    def valid_group_name(self, group_name: str) -> bool: ...

_InMemoryQueueData: TypeAlias = tuple[float, dict[str, Any]]

class InMemoryChannelLayer(BaseChannelLayer):
    channels: dict[str, asyncio.Queue[_InMemoryQueueData]]
    groups: dict[str, dict[str, float]]
    group_expiry: int

    def __init__(
        self,
        expiry: int = 60,
        group_expiry: int = 86400,
        capacity: int = 100,
        channel_capacity: _ChannelCapacityDict | None = None,
    ) -> None: ...

    extensions: list[str]

    async def send(self, channel: str, message: dict[str, Any]) -> None: ...
    async def receive(self, channel: str) -> dict[str, Any]: ...
    async def new_channel(self, prefix: str = "specific.") -> str: ...
    async def flush(self) -> None: ...
    async def close(self) -> None: ...
    async def group_add(self, group: str, channel: str) -> None: ...
    async def group_discard(self, group: str, channel: str) -> None: ...
    async def group_send(self, group: str, message: dict[str, Any]) -> None: ...

def get_channel_layer(alias: str = "default") -> BaseChannelLayer | None: ...

channel_layers: ChannelLayerManager
